In [14]:
#Create a Kafka consumer with the correct credentials 
# Also need to set deserializer

from kafka import KafkaConsumer
import json


consumer = KafkaConsumer(
    client_id="client1",
    bootstrap_servers=f"kafka-4da1624-wssitu-05f0.aivencloud.com:19068",
    security_protocol="SSL",
    ssl_cafile="ca.pem",
    ssl_certfile="service.cert",
    ssl_keyfile="service.key",
    value_deserializer = lambda v: json.loads(v.decode('ascii')),
    key_deserializer = lambda v: json.loads(v.decode('ascii')),
    max_poll_records = 10
)
In [15]:
# Use topics method to see all topics
consumer.topics()
Out[15]:
{'Pizza'}
In [16]:
topic_name = "Pizza"
In [17]:
# subscribe to a list of topics, in this case only "Pizza"
consumer.subscribe(topics=["Pizza"])
In [18]:
# Check what you are subscribed to 
consumer.subscription()
Out[18]:
{'Pizza'}
In [ ]:
# Use for loop to continuously print messages from the Kafka topic
for message in consumer:
    print("Partition is ",message.partition, "Offset is ",message.offset, "key is ",message.key, "value is ",message.value)
Partition is  0 Offset is  67 key is  {'id': 1} value is  {'name': 'John', 'pizza': 'Cheese'}
Partition is  0 Offset is  68 key is  {'id': 2} value is  {'name': 'Steve', 'pizza': 'Pepperoni'}
Partition is  0 Offset is  69 key is  {'id': 3} value is  {'name': 'Kate', 'pizza': 'Steak'}
Partition is  0 Offset is  70 key is  {'id': 4} value is  {'name': 'Jason', 'pizza': 'Chicken'}
Partition is  0 Offset is  71 key is  {'id': 5} value is  {'name': 'Kevin', 'pizza': 'Donaid'}